﻿using RabbitMQ.Gateway;
using RabbitMQ.Gateway.Config;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace RabbitMQ.Utils
{
    public class RQConsumer
    {
        private static readonly log4net.ILog log = log4net.LogManager.GetLogger("mqframeloger");

        private static Dictionary<string, IAsyncConsumerProcessor> recevers = new Dictionary<string, IAsyncConsumerProcessor>();

        public static GatewayFactory factory = new GatewayFactory() ;


        /// 从消息队列获取消息
        /// </summary>
        /// <returns></returns>
        private static void ConsumerStart(string ReceiverName,log4net.ILog loger,bool Reconnection=false)
        {
            try
            {
                /** cf.GetAsyncReceiver 中进行Task.WaitAll 此处才能生效，否则连接会随着cf Dispose断开
                //IAsyncConsumerProcessor receiver;
                //using (var cf = new GatewayFactory())
                //{
                //    log.InfoFormat(".......消费者:{0} 完成了队列订阅", ReceiverName);
                //    receiver = cf.GetAsyncReceiver(ReceiverName, false);
                //    recevers.Add(ReceiverName, receiver);
                //}
                **/

                IAsyncConsumerProcessor receiver;
                receiver = factory.GetAsyncReceiver(ReceiverName, loger,RQProducer.EfficientSend, false, Reconnection);
                log.InfoFormat(".......消费者:{0} 完成了队列订阅", ReceiverName);
                if (recevers.ContainsKey(ReceiverName))
                {
                    recevers[ReceiverName] = receiver;
                }
                else
                {
                    recevers.Add(ReceiverName, receiver);
                }
            }
            catch (Exception ex)
            {
                log.Fatal(ex);
            }
        }

        /// <summary>
        /// restart consumer
        /// </summary>
        /// <param name="IsAll"></param>
        public static void ConsumerReStart(bool IsAll)
        {
            if (IsAll)
            {
                //factory.Clear();
                factory = new GatewayFactory();
                foreach (IAsyncConsumerProcessor p in recevers.Values)
                {
                    p.Close = true;
                }
                
                ConsumerStart();
                log.InfoFormat("......所有消费者进行了重新订阅了...........");
                return;
            }
            //restart close consumer
            Parallel.ForEach(recevers, new ParallelOptions() { MaxDegreeOfParallelism = 20 }, delegate (KeyValuePair<string,IAsyncConsumerProcessor> i)
            {
                if (!i.Value.IsOpen)
                {
                    RQConsumer.ConsumerStart(i.Key,Logs[i.Key],true);
                    log.InfoFormat("......消费者{0}进行了重新订阅...........",i.Key);
                }
            });
            //start new config consumer
            Dictionary<string,string> restartCol = factory.GetAsyncReceiverList();
            Parallel.ForEach(restartCol.Keys, new ParallelOptions() { MaxDegreeOfParallelism = 20 }, delegate (string o)
            {
                if (!recevers.ContainsKey(o))
                {
                    if (!Logs.ContainsKey(o))
                    {
                        lock (Logs)
                        {
                            Logs.Add(o, log4net.LogManager.GetLogger(restartCol[o]));
                        }
                    }
                    RQConsumer.ConsumerStart(o,Logs[o], true);
                    log.InfoFormat("......消费者{0}进行了重新订阅...........", o);
                }
            });
        }

        private readonly static Dictionary<string, log4net.ILog> Logs = new Dictionary<string, log4net.ILog>();

        /// <summary>
        /// start consumer(消费者)
        /// </summary>
        public static void ConsumerStart()
        {
            try
            {
                Dictionary<string,string> col = factory.GetAsyncReceiverList();
                Parallel.ForEach(col.Keys, new ParallelOptions() { MaxDegreeOfParallelism = 20 }, delegate (string n)
                {
                    lock(Logs)
                    {
                        if (!Logs.ContainsKey(n))
                        {
                            Logs.Add(n, log4net.LogManager.GetLogger(col[n]));
                        }
                    }
                    RQConsumer.ConsumerStart(n,Logs[n]);
                });
            }
            catch (Exception ex)
            {
                log.Fatal(ex.ToString());
            }
        }

        public static void ConsumerServiceStart()
        {
            try
            {
                if ((System.Configuration.ConfigurationManager.AppSettings["IsStopConsumer"] ?? "").Equals("true", StringComparison.CurrentCultureIgnoreCase))
                {
                    Control.IsStopConsumer = true;
                }

                Dictionary<string, string> col = factory.GetAsyncReceiverList();
                Parallel.ForEach(col.Keys, new ParallelOptions() { MaxDegreeOfParallelism = 20 }, delegate (string n)
                {
                    lock (Logs)
                    {
                        if (!Logs.ContainsKey(n))
                        {
                            Logs.Add(n, log4net.LogManager.GetLogger(col[n]));
                        }
                    }

                    if (!recevers.ContainsKey(n))
                    {
                        RQConsumer.ConsumerStart(n, Logs[n]);
                    }
                    else
                    {
                        if (!recevers[n].IsOpen)
                        {
                            RQConsumer.ConsumerStart(n, Logs[n],true);
                        }
                    }
                });
            }
            catch (Exception ex)
            {
                log.Fatal(ex.ToString());
            }
        }
    }
}
